package test;

import com.mongodb.client.MongoCollection;
import kafka.KafkaHandle;
import kafka.LogMap;
import mongo.DateUtil;
import mongo.MongoTest;
import net.sf.json.JSONObject;
import org.apache.log4j.Logger;
import org.bson.Document;

import java.util.Date;
import java.util.List;

/**
 * 添加 kafka 的数据至mongo
 *
 * USER: lintc 【lintiancong@zhuojianchina.com】
 * DATE: 2017-08-09 15:09
 */
public class KafkaToMongo implements KafkaHandle {

    private static Logger LOG = Logger.getLogger(KafkaToMongo.class);

    @Override
    public void handle(LogMap logMap, String topic, String time) {
        List<JSONObject> list = logMap.getList(time);
        if (null != list) {
            LOG.info(topic + " : " + list.size());
        }
    }

    public void add(String time, List<JSONObject> list, MongoCollection<Document> collection) {
        long start = System.currentTimeMillis();
        MongoTest.insertList(collection, list);
        long end = System.currentTimeMillis();
//        LOG.info(topic + " : " + time + " ： 添加成功 ：" + logNum + " : " + (end - start));
    }
}
